Solutions/CiscoDuoSecurity/Data Connectors/AzureFunctionCiscoDuo/main.py (373 lines of code) (raw):
import os
import logging
import time
import re
from typing import Iterable, List
import duo_client
import azure.functions as func
from .sentinel_connector import AzureSentinelConnector
from .state_manager import StateManager
logging.getLogger('azure.core.pipeline.policies.http_logging_policy').setLevel(logging.ERROR)
CISCO_DUO_INTEGRATION_KEY = os.environ['CISCO_DUO_INTEGRATION_KEY']
CISCO_DUO_SECRET_KEY = os.environ['CISCO_DUO_SECRET_KEY']
CISCO_DUO_API_HOSTNAME = os.environ['CISCO_DUO_API_HOSTNAME']
WORKSPACE_ID = os.environ['WORKSPACE_ID']
SHARED_KEY = os.environ['SHARED_KEY']
FILE_SHARE_CONN_STRING = os.environ['AzureWebJobsStorage']
LOG_TYPE = 'CiscoDuo'
MAX_SYNC_WINDOW_PER_RUN_MINUTES = os.getenv('MAX_SYNC_WINDOW_PER_RUN_MINUTES', "60")
MAX_SCRIPT_EXEC_TIME_MINUTES = 10
LOG_ANALYTICS_URI = os.environ.get('logAnalyticsUri')
if not LOG_ANALYTICS_URI or str(LOG_ANALYTICS_URI).isspace():
LOG_ANALYTICS_URI = 'https://' + WORKSPACE_ID + '.ods.opinsights.azure.com'
pattern = r'https:\/\/([\w\-]+)\.ods\.opinsights\.azure.([a-zA-Z\.]+)$'
match = re.match(pattern, str(LOG_ANALYTICS_URI))
if not match:
raise Exception("Invalid Log Analytics Uri.")
def main(mytimer: func.TimerRequest) -> None:
logging.info('Starting script')
start_ts = int(time.time())
admin_api = duo_client.Admin(
ikey=CISCO_DUO_INTEGRATION_KEY,
skey=CISCO_DUO_SECRET_KEY,
host=CISCO_DUO_API_HOSTNAME,
)
sentinel = AzureSentinelConnector(
log_analytics_uri=LOG_ANALYTICS_URI,
workspace_id=WORKSPACE_ID,
shared_key=SHARED_KEY,
log_type=LOG_TYPE,
queue_size=5000
)
log_types = get_log_types()
if 'trust_monitor' in log_types:
state_manager = StateManager(FILE_SHARE_CONN_STRING, file_path='cisco_duo_trust_monitor_logs_last_ts.txt')
process_trust_monitor_events(admin_api, state_manager=state_manager, sentinel=sentinel)
if check_if_script_runs_too_long(start_ts):
logging.info('Script is running too long. Saving progress and exit.')
return
if 'authentication' in log_types:
state_manager = StateManager(FILE_SHARE_CONN_STRING, file_path='cisco_duo_auth_logs_last_ts.txt')
process_auth_logs(admin_api, start_ts, state_manager=state_manager, sentinel=sentinel)
if check_if_script_runs_too_long(start_ts):
logging.info('Script is running too long. Saving progress and exit.')
return
if 'administrator' in log_types:
state_manager = StateManager(FILE_SHARE_CONN_STRING, file_path='cisco_duo_admin_logs_last_ts.txt')
process_admin_logs(admin_api, start_ts, state_manager=state_manager, sentinel=sentinel)
if check_if_script_runs_too_long(start_ts):
logging.info('Script is running too long. Saving progress and exit.')
return
if 'telephony' in log_types:
state_manager = StateManager(FILE_SHARE_CONN_STRING, file_path='cisco_duo_tele_logs_last_ts.txt')
process_tele_logs(admin_api, start_ts, state_manager=state_manager, sentinel=sentinel)
if check_if_script_runs_too_long(start_ts):
logging.info('Script is running too long. Saving progress and exit.')
return
if 'offline_enrollment' in log_types:
state_manager = StateManager(FILE_SHARE_CONN_STRING, file_path='cisco_duo_offline_enrollment_logs_last_ts.txt')
process_offline_enrollment_logs(admin_api, start_ts, state_manager=state_manager, sentinel=sentinel)
logging.info('Script finished. Sent events: {}'.format(sentinel.successfull_sent_events_number))
def get_log_types():
res = str(os.environ.get('CISCO_DUO_LOG_TYPES', ''))
if not res:
res = 'trust_monitor,authentication,administrator,telephony,offline_enrollment'
return [x.lower().strip() for x in res.split(',')]
def process_trust_monitor_events(admin_api: duo_client.Admin, state_manager: StateManager, sentinel: AzureSentinelConnector) -> None:
logging.info('Start processing trust_monitor logs')
logging.info('Getting last timestamp')
mintime = state_manager.get()
if mintime:
logging.info('Last timestamp is {}'.format(mintime))
mintime = int(mintime) + 1
else:
logging.info('Last timestamp is not known. Getting data for last 24h')
mintime = int(time.time() - 86400) * 1000
maxtime = int(time.time() - 120) * 1000
diff = maxtime - mintime
maxwindow = int(MAX_SYNC_WINDOW_PER_RUN_MINUTES) * 60000
if(diff > maxwindow):
maxtime = mintime + maxwindow
logging.warn('Ingestion is lagging for trust_monitor logs, limiting synchronization window to {}'.format(maxwindow))
logging.info('Making trust_monitor logs request: mintime={}, maxtime={}'.format(mintime, maxtime))
for event in admin_api.get_trust_monitor_events_iterator(mintime=mintime, maxtime=maxtime):
sentinel.send(event)
sentinel.flush()
logging.info('Saving trust_monitor logs last timestamp {}'.format(maxtime))
state_manager.post(str(maxtime))
def process_auth_logs(admin_api: duo_client.Admin, start_ts, state_manager: StateManager, sentinel: AzureSentinelConnector) -> None:
limit = 1000
logging.info('Start processing authentication logs')
logging.info('Getting last timestamp')
mintime = state_manager.get()
if mintime:
logging.info('Last timestamp is {}'.format(mintime))
mintime = int(mintime) + 1
else:
logging.info('Last timestamp is not known. Getting data for last 24h')
mintime = int(time.time() - 86400) * 1000
maxtime = int(time.time() - 120) * 1000
diff = maxtime - mintime
maxwindow = int(MAX_SYNC_WINDOW_PER_RUN_MINUTES) * 60000
if(diff > maxwindow):
maxtime = mintime + maxwindow
logging.warn('Ingestion is lagging for authentication logs, limiting synchronization window to {}'.format(maxwindow))
events, next_offset = get_auth_logs(admin_api, mintime, maxtime)
for event in events:
sentinel.send(event)
sentinel.flush()
logging.info('Saving auth logs last timestamp {}'.format(maxtime))
state_manager.post(str(maxtime))
while len(events) == limit:
if next_offset and next_offset is not None:
next_offset = ','.join(next_offset)
else:
break
logging.info('Making authentication logs request: next_offset={}'.format(next_offset))
try:
response = admin_api.get_authentication_log(api_version=2, mintime=mintime, maxtime=maxtime, limit=str(limit), sort='ts:asc', next_offset=next_offset)
logging.info('Response recieved {}'.format(response))
except Exception as ex:
logging.info('Error in while loop while getting authentication logs- {}'.format(ex))
if ex.status == 429:
logging.info('429 exception occurred, trying retry after 60 seconds')
time.sleep(60)
response = admin_api.get_authentication_log(api_version=2, mintime=mintime, maxtime=maxtime, limit=str(limit), sort='ts:asc', next_offset=next_offset)
if(response is not None):
events = response['authlogs']
logging.info('Obtained {} auth events'.format(len(events)))
else:
logging.info('returned response as Null')
for event in events:
sentinel.send(event)
sentinel.flush()
logging.info('Saving auth logs last timestamp {}'.format(maxtime))
state_manager.post(str(maxtime))
if check_if_script_runs_too_long(start_ts):
logging.info('Script is running too long. Saving progress and exit.')
return
def get_auth_logs(admin_api: duo_client.Admin, mintime: int, maxtime: int):
limit = 1000
logging.info('Making authentication logs request: mintime={}, maxtime={}'.format(mintime, maxtime))
try:
res = admin_api.get_authentication_log(api_version=2, mintime=mintime, maxtime=maxtime, limit=str(limit), sort='ts:asc')
except Exception as err:
logging.info('Error while getting authentication logs- {}'.format(err))
if err.status == 429:
logging.info('429 exception occurred, trying retry after 60 seconds')
time.sleep(60)
res = admin_api.get_authentication_log(api_version=2, mintime=mintime, maxtime=maxtime, limit=str(limit), sort='ts:asc')
if(res is not None):
events = res['authlogs']
next_offset = res['metadata']['next_offset']
logging.info('Obtained {} auth events'.format(len(events)))
else:
logging.info('Error while getting authentication logs')
events = None
next_offset = None
return events, next_offset
def process_admin_logs(admin_api: duo_client.Admin, start_ts, state_manager: StateManager, sentinel: AzureSentinelConnector) -> None:
limit = 1000
logging.info('Start processing administrator logs')
logging.info('Getting last timestamp')
mintime = state_manager.get()
if mintime:
logging.info('Last timestamp is {}'.format(mintime))
mintime = int(mintime) + 1
else:
logging.info('Last timestamp is not known. Getting data for last 24h')
mintime = int(time.time() - 86400)
last_ts = None
events = get_admin_logs(admin_api, mintime)
for event in events:
last_ts = event['timestamp']
sentinel.send(event)
sentinel.flush()
if last_ts:
logging.info('Saving admin logs last timestamp {}'.format(last_ts))
state_manager.post(str(last_ts))
while len(events) == limit:
mintime = last_ts
mintime += 1
logging.info('Making administrator logs request: mintime={}'.format(mintime))
try:
events = admin_api.get_administrator_log(mintime)
except Exception as ex:
logging.info('Error while getting administrator logs- {}'.format(ex))
if ex.status == 429:
logging.info('429 exception occurred, trying retry after 60 seconds')
time.sleep(60)
events = admin_api.get_administrator_log(mintime)
if(events is not None):
logging.info('Obtained {} admin events'.format(len(events)))
else:
logging.info('Events returned as null in administrator logs')
for event in events:
last_ts = event['timestamp']
sentinel.send(event)
sentinel.flush()
if last_ts:
logging.info('Saving admin logs last timestamp {}'.format(last_ts))
state_manager.post(str(last_ts))
if check_if_script_runs_too_long(start_ts):
logging.info('Script is running too long. Saving progress and exit.')
return
def get_admin_logs(admin_api: duo_client.Admin, mintime: int) -> Iterable[dict]:
limit = 1000
logging.info('Making administrator logs request: mintime={}'.format(mintime))
try:
events = admin_api.get_administrator_log(mintime)
except Exception as err:
logging.info('Error while getting administrator logs- {}'.format(err))
if err.status == 429:
logging.info('429 exception occurred, trying retry after 60 seconds')
time.sleep(60)
events = admin_api.get_administrator_log(mintime)
if(events is not None):
logging.info('Obtained {} admin events'.format(len(events)))
else:
logging.info('Error while getting administrator logs')
events = None
return events
def process_tele_logs(admin_api: duo_client.Admin, start_ts, state_manager: StateManager, sentinel: AzureSentinelConnector) -> None:
limit = 1000
logging.info('Start processing telephony logs')
logging.info('Getting last timestamp')
mintime = state_manager.get()
if mintime:
logging.info('Last timestamp is {}'.format(mintime))
mintime = int(mintime) + 1
else:
logging.info('Last timestamp is not known. Getting data for last 24h')
mintime = int(time.time() - 86400)
last_ts = None
events = get_tele_logs(admin_api, mintime)
for event in events:
last_ts = event['timestamp']
sentinel.send(event)
sentinel.flush()
if last_ts:
logging.info('Saving telephony logs last timestamp {}'.format(last_ts))
state_manager.post(str(last_ts))
while len(events) == limit:
mintime = last_ts
mintime += 1
logging.info('Making telephony logs request: mintime={}'.format(mintime))
try:
events = admin_api.get_telephony_log(mintime)
except Exception as ex:
logging.info('Error while getting telephony logs - {}'.format(ex))
if ex.status == 429:
logging.info('429 exception occurred, trying retry after 60 seconds')
time.sleep(60)
events = admin_api.get_telephony_log(mintime)
if(events is not None):
logging.info('Obtained {} tele events'.format(len(events)))
else:
logging.info('Events returned as null in telephony logs')
for event in events:
last_ts = event['timestamp']
sentinel.send(event)
sentinel.flush()
if last_ts:
logging.info('Saving telephony logs last timestamp {}'.format(last_ts))
state_manager.post(str(last_ts))
if check_if_script_runs_too_long(start_ts):
logging.info('Script is running too long. Saving progress and exit.')
return
def get_tele_logs(admin_api: duo_client.Admin, mintime: int) -> Iterable[dict]:
limit = 1000
logging.info('Making telephony logs request: mintime={}'.format(mintime))
try:
events = admin_api.get_telephony_log(mintime)
except Exception as err:
logging.info('Error while getting telephony logs - {}'.format(err))
if err.status == 429:
logging.info('429 exception occurred, trying retry after 60 seconds')
time.sleep(60)
events = admin_api.get_telephony_log(mintime)
if(events is not None):
logging.info('Obtained {} tele events'.format(len(events)))
else:
logging.info('Error while getting telephony logs')
events = None
return events
def process_offline_enrollment_logs(admin_api: duo_client.Admin, start_ts, state_manager: StateManager, sentinel: AzureSentinelConnector) -> None:
limit = 1000
logging.info('Start processing offline_enrollment logs')
logging.info('Getting last timestamp')
mintime = state_manager.get()
if mintime:
logging.info('Last timestamp is {}'.format(mintime))
mintime = int(mintime) + 1
else:
logging.info('Last timestamp is not known. Getting data for last 24h')
mintime = int(time.time() - 86400)
last_ts = None
events = get_offline_enrollment_logs(admin_api, mintime)
for event in events:
last_ts = event['timestamp']
sentinel.send(event)
sentinel.flush()
if last_ts:
logging.info('Saving offline_enrollment logs last timestamp {}'.format(last_ts))
state_manager.post(str(last_ts))
while len(events) == limit:
mintime = last_ts
mintime += 1
logging.info('Making offline_enrollment logs request: mintime={}'.format(mintime))
try:
events = make_offline_enrollment_logs_request(admin_api, mintime)
except Exception as ex:
logging.info('Error while getting offline_enrollment logs - {}'.format(ex))
if ex.status == 429:
logging.info('429 exception occurred, trying retry after 60 seconds')
time.sleep(60)
events = make_offline_enrollment_logs_request(admin_api, mintime)
if(events is not None):
logging.info('Obtained {} offline_enrollment events'.format(len(events)))
else:
logging.info('Events returned as null in offline_enrollment logs')
for event in events:
last_ts = event['timestamp']
sentinel.send(event)
sentinel.flush()
if last_ts:
logging.info('Saving offline_enrollment logs last timestamp {}'.format(last_ts))
state_manager.post(str(last_ts))
if check_if_script_runs_too_long(start_ts):
logging.info('Script is running too long. Saving progress and exit.')
return
def get_offline_enrollment_logs(admin_api: duo_client.Admin, mintime: int) -> Iterable[dict]:
limit = 1000
logging.info('Making offline_enrollment logs request: mintime={}'.format(mintime))
try:
events = make_offline_enrollment_logs_request(admin_api, mintime)
except Exception as err:
logging.info('Error while getting offline_enrollment logs- {}'.format(err))
if err.status == 429:
logging.info('429 exception occurred, trying retry after 60 seconds')
time.sleep(60)
events = make_offline_enrollment_logs_request(admin_api, mintime)
if(events is not None):
logging.info('Obtained {} offline_enrollment events'.format(len(events)))
else:
logging.info('Error while getting offline_enrollment logs')
events = None
return events
def make_offline_enrollment_logs_request(admin_api: duo_client.Admin, mintime) -> List[dict]:
mintime = str(int(mintime))
params = {
'mintime': mintime,
}
response = admin_api.json_api_call(
'GET',
'/admin/v1/logs/offline_enrollment',
params,
)
return response
def check_if_script_runs_too_long(start_ts):
now = int(time.time())
duration = now - start_ts
max_duration = int(MAX_SCRIPT_EXEC_TIME_MINUTES * 60 * 0.85)
return duration > max_duration